package com.joysuccess.thread;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * @author : zhangqing
 * @Description: 线程池拒绝策略
 * @date : 2020年06月22日 10:52
 */
public class JoyIgnorePolicy implements RejectedExecutionHandler {

    static Properties properties;
    static final String TOPIC_NAME="thread-ignore-task-topic";
    static KafkaProducer<String,String> kafkaProducer;
    static {
        properties = new Properties();
        properties.put("bootstrap.servers","localhost:9092");
        properties.put("acks","1");
        properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
        kafkaProducer = new KafkaProducer(properties);
    }

    /**
     * 拒绝策略将任务写到Kafka的消息队列中
     * @param r
     * @param executor
     */
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
//        sendToMQ(r);
    }

    private void sendToMQ(Runnable r) {
        ProducerRecord<String,String> producerRecord = new ProducerRecord(TOPIC_NAME,r);
        kafkaProducer.send(producerRecord, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if(exception != null) {
                    System.out.println("------------" + exception.getCause() + "任务：" + r);
                }else {
                    System.out.println("=============" + metadata.partition() + ":" +metadata.offset());
                }
            }
        });
    }
}
